package com.freedom.monitor.myeye.storage.service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.thrift.TException;
import org.hbase.async.Config;
import org.hbase.async.HBaseClient;
import org.springframework.util.ResourceUtils;

import com.freedom.monitor.myeye.commmon.model.IOModel;
import com.freedom.monitor.myeye.commmon.model.KeyModel;
import com.freedom.monitor.myeye.commmon.model.SubKeyModel;
import com.freedom.monitor.myeye.commmon.utils.CompressUtils;
import com.freedom.monitor.myeye.commmon.utils.StringUtils;
import com.freedom.monitor.myeye.storage.utils.ConstantUtils;
import com.freedom.monitor.myeye.storage.utils.HBaseUtils;
import com.freedom.monitor.myeye.storage.utils.PropertyUtils;
import com.freedom.monitor.myeye.storage.utils.RedisUtils;
import com.freedom.rpc.thrift.common.utils.Logger;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;

//Phoenix    --->    http://www.cnblogs.com/smartloli/p/5047527.html
//AsyncHBase --->    http://wenku.baidu.com/link?url=IYJtde4n2uu7ftJuYfm-ULC8NQADYKdp4nZQ5Jnj8c9bAG3LWEXtIKWxoWgZEhaQhewQuy_7XDyo64Q2kxH8pnZK_zkbrSjY7S0iHYzFmiC
//           --->    https://github.com/OpenTSDB/asynchbase
//           --->    http://opentsdb.github.io/asynchbase/
//           --->    http://opentsdb.github.io/asynchbase/docs/build/html/index.html
//           --->配置项http://opentsdb.github.io/asynchbase/docs/build/html/configuration.html#properties
//           --->配置项https://github.com/OpenTSDB/asynchbase/blob/master/asynchbase.conf
//           --->http://www.ashishpaliwal.com/blog/2013/12/quick-start-with-asynchbase/
//需要设置hbase.regionserver.ipc.address=0.0.0.0,否则regionServer一直绑定内网IP
//需要设置hbase.master.ipc.address=0.0.0.0
@SuppressWarnings("unused")
public class StorageServiceImpl implements StorageService.Iface {
	private static final Logger logger = Logger.getLogger(StorageServiceImpl.class);

	@Override
	public boolean put(String data) throws TException {
		logger.debug("StorageServiceImpl.put ---> " + data);
		try {
			// 0)收到了数据,校验
			if (null == data) {
				throw new Exception("data is null");
			}
			//
			// 1)序列化成IOModel对象
			Gson gson = new GsonBuilder().create();
			IOModel ioModel = gson.fromJson(data, new TypeToken<IOModel>() {
			}.getType());
			if (null == ioModel) {
				throw new Exception("ioModel --->  new Gson().fromJson(data, IOModel.class) fail ...");
			}
			// 2)取出IOModel里的内容
			data = ioModel.getCt();
			if (null == data) {
				throw new Exception("data = ioModel.getCt() ---> null");
			}
			// 3)根据压缩算法修正出最终内容
			if (ioModel.isCp()) {
				if (CompressUtils.KAFKA_LZ4 != ioModel.getCa()) {
					throw new Exception("unknown compress algorithm--->" + ioModel.getCa());
				}
				data = CompressUtils.deCompress(data);
				if (null == data) {
					throw new Exception("wrong happended ---> CompressUtils.deCompress(data)");
				}
			}
			// 4)序列化成List<KeyModel>
			logger.debug(data);
			List<KeyModel> keyModels = gson.fromJson(data, new TypeToken<ArrayList<KeyModel>>() {
			}.getType());
			if (null == keyModels) {
				throw new Exception("keyModels is null,wrong");
			}
			// 5)遍历List<KeyModel>
			if (false == put0(keyModels)) {// 本次操作失败
				throw new Exception("wrong with ---> handle(keyModels)");
			}
			return true;
		} catch (Exception e) {
			logger.error(e.toString());
			return false;
		}
	}

	private boolean put0(List<KeyModel> keyModels) {
		//
		List<Put> puts = new ArrayList<Put>();// 用来存储所有的Data数据
		Set<String> productServiceSet = new HashSet<String>();
		Set<String> productServiceServeridSet = new HashSet<String>();
		Set<String> productServiceServeridSubkeySet = new HashSet<String>();
		logger.debug("---------------------------------------------");
		// 0)debug数据
		String sampleTime;
		String productName;
		String serviceName;
		String serverId;
		List<SubKeyModel> subKeyModels;
		//
		String subKey;
		long totalCount;
		long totalSucceed;
		long totalCostTime;
		long maxCostTime;
		long minCostTime;
		//
		for (KeyModel keyModel : keyModels) {
			//
			productName = keyModel.getPn();// 产品
			serviceName = keyModel.getSn();// 服务
			serverId = keyModel.getSid();// serverID
			sampleTime = keyModel.getSt();// 采样时间
			//
			subKeyModels = keyModel.getKey();// 子键列表
			for (SubKeyModel subKeyModel : subKeyModels) {// 再来遍历里面具体的小Key
				// 1)开始具体的写入数据到HBase里
				subKey = subKeyModel.getSk();// 子键
				totalCount = subKeyModel.getTc();// 总次数
				totalSucceed = subKeyModel.getTs();// 总成功次数
				totalCostTime = subKeyModel.getTct();// 总耗时
				maxCostTime = subKeyModel.getMaxct();// 最大耗时
				minCostTime = subKeyModel.getMinct();// 最小耗时
				//
				// 构造Put,这里先这么写，后续可能会改
				Put put = new Put(Bytes
						.toBytes(StringUtils.unionByMagicKey(productName, serviceName, serverId, subKey, sampleTime)));
				put.addColumn(Bytes.toBytes(ConstantUtils.FAMILY1), Bytes.toBytes(ConstantUtils.FIELD_TOTAL_COUNT),
						Bytes.toBytes(totalCount));
				put.addColumn(Bytes.toBytes(ConstantUtils.FAMILY1), Bytes.toBytes(ConstantUtils.FIELD_TOTAL_SUCCEED),
						Bytes.toBytes(totalSucceed));
				put.addColumn(Bytes.toBytes(ConstantUtils.FAMILY1), Bytes.toBytes(ConstantUtils.FIELD_TOTAL_COST_TIME),
						Bytes.toBytes(totalCostTime));
				put.addColumn(Bytes.toBytes(ConstantUtils.FAMILY1), Bytes.toBytes(ConstantUtils.FIELD_MAX_COST_TIME),
						Bytes.toBytes(maxCostTime));
				put.addColumn(Bytes.toBytes(ConstantUtils.FAMILY1), Bytes.toBytes(ConstantUtils.FIELD_MIN_COST_TIME),
						Bytes.toBytes(minCostTime));
				put.setTTL(3 * 31 * 24 * 60 * 60 * 1000);// 约等于3个月的有效期,单位:毫秒
				// 加入到Put列表
				puts.add(put);
				// 更新本地productServiceSet
				productServiceSet.add(StringUtils.unionByMagicKey(productName, serviceName));
				// 更新本地的productServiceServeridSet
				productServiceServeridSet.add(StringUtils.unionByMagicKey(productName, serviceName, serverId));
				// 更新本地的productServiceServeridSubkeySet
				productServiceServeridSubkeySet
						.add(StringUtils.unionByMagicKey(productName, serviceName, serverId, subKey));
			}
		}
		if (false == HBaseUtils.insertIntoTable(ConstantUtils.TABLE_DATA, puts)) {// 构造好了Put,加入到Table,已经包括了总和统计信息
			return false;
		}
		//
		// 按需插入服务表
		logger.debug("begin to put1 " + ConstantUtils.TABLE_SERVICE);
		logger.debug("productServiceSet ---> " + productServiceSet);
		put1(productServiceSet, ConstantUtils.TABLE_SERVICE);
		logger.debug("end to put1 " + ConstantUtils.TABLE_SERVICE);
		//
		// 按需插入serverID表的数据
		logger.debug("begin to put1 " + ConstantUtils.TABLE_SERVER_ID);
		put1(productServiceServeridSet, ConstantUtils.TABLE_SERVER_ID);
		logger.debug("end to put1 " + ConstantUtils.TABLE_SERVER_ID);
		//
		// 接需插入subKey表的数据
		put1(productServiceServeridSubkeySet, ConstantUtils.TABLE_SUB_KEY);
		//
		return true;
	}

	private static boolean put1(Set<String> keys, String tableName) {
		logger.debug("tableName --->" + tableName + " serviceSet ---> " + keys);
		keys = RedisUtils.findNotExist(keys);
		// 如果不为空,有必要做一些事情
		if (keys.isEmpty()) {
			logger.debug("nothing to do with " + tableName);
			return true;
		}
		// 更新到HBase
		List<Put> puts = new ArrayList<Put>();
		for (String key : keys) {
			Put put = new Put(Bytes.toBytes(key));
			put.addColumn(Bytes.toBytes(ConstantUtils.FAMILY1), Bytes.toBytes(ConstantUtils.FIELD_ZERO),
					Bytes.toBytes(ConstantUtils.FIELD_EMPTY));
			puts.add(put);
		}
		if (false == HBaseUtils.insertIntoTable(tableName, puts)) {
			return false;
		}
		// 更新到redis用于下一次加速
		RedisUtils.setWhenNotExist(keys);
		logger.debug("update some thing with " + tableName);
		// 结束
		return true;
	}

	@Override
	public ServiceResult getService(String productCode) throws TException {
		logger.debug("getService invoked...");
		ServiceResult result = new ServiceResult();
		//
		// 1)判断productCode是否有效
		if (false == StringUtils.valid(productCode)) {
			result.setSucceed(false);
			result.setErrorMsg("invalid product code...");
			return result;
		}
		// 2)查询HBase
		Set<String> serviceSet = new HashSet<String>();
		try {
			serviceSet = HBaseUtils.queryTips(ConstantUtils.TABLE_SERVICE, productCode);
		} catch (Exception e) {
			logger.error(e.toString());
			result.setSucceed(false);
			result.setErrorMsg("query hbase fail...");
			return result;
		}
		// 3)查出为空
		if (serviceSet.isEmpty()) {
			result.setSucceed(true);
			result.setServiceList(new ArrayList<String>());
			return result;
		}
		// 4)查出不为空
		result.setSucceed(true);
		logger.debug("-----------------------------");
		for (String service : serviceSet) {
			result.addToServiceList(service);
			logger.debug("add service--->" + service);
		}
		return result;

	}

	@Override
	public ServerIdResult getServerId(String productCode, String serviceCode) throws TException {
		logger.debug("get ServerId invoked...");
		ServerIdResult result = new ServerIdResult();
		//
		// 1)判断productCode/serviceCode是否有效
		if (false == StringUtils.valid(productCode, serviceCode)) {
			result.setSucceed(false);
			result.setErrorMsg("invalid product code/service code...");
			return result;
		}
		// 2)查询HBase
		Set<String> serverIdSet = new HashSet<String>();
		try {
			serverIdSet = HBaseUtils.queryTips(ConstantUtils.TABLE_SERVER_ID,
					StringUtils.unionByMagicKey(productCode, serviceCode));
		} catch (Exception e) {
			logger.error(e.toString());
			result.setSucceed(false);
			result.setErrorMsg("query hbase fail...");
			return result;
		}
		// 3)查出为空
		if (serverIdSet.isEmpty()) {
			result.setSucceed(true);
			result.setServerIdList(new ArrayList<String>());
			return result;
		}
		// 4)查出不为空
		result.setSucceed(true);
		logger.debug("-----------------------------");
		for (String serverId : serverIdSet) {
			result.addToServerIdList(serverId);
			logger.debug("add server ID--->" + serverId);
		}
		return result;
	}

}